chore(blockbuilder): cleanup (#15730)

pull/15804/head
Owen Diehl 4 months ago committed by GitHub
parent 848c31fce9
commit f0404a3934
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 87
      pkg/blockbuilder/builder/builder.go
  2. 8
      pkg/blockbuilder/scheduler/priority_queue.go
  3. 417
      pkg/blockbuilder/scheduler/queue.go
  4. 379
      pkg/blockbuilder/scheduler/queue_test.go
  5. 215
      pkg/blockbuilder/scheduler/scheduler.go
  6. 19
      pkg/blockbuilder/scheduler/scheduler_test.go
  7. 33
      pkg/blockbuilder/scheduler/status.go
  8. 2
      pkg/blockbuilder/scheduler/status_preview_test.go
  9. 41
      pkg/blockbuilder/scheduler/status_test.go
  10. 19
      pkg/blockbuilder/scheduler/strategy.go
  11. 123
      pkg/blockbuilder/scheduler/strategy_test.go
  12. 5
      pkg/blockbuilder/types/job.go
  13. 72
      pkg/kafka/partition/offset_manager.go
  14. 1
      pkg/loki/modules.go

@ -2,7 +2,6 @@ package builder
import (
"context"
"errors"
"flag"
"fmt"
"math"
@ -14,6 +13,7 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/sync/errgroup"
@ -247,7 +247,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
BuilderID: workerID,
})
if err != nil {
return false, err
return false, errors.Wrap(err, "requesting job")
}
if !resp.OK {
@ -274,8 +274,9 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
Job: job,
Success: true,
}
if _, err = i.processJob(ctx, c, job, logger); err != nil {
level.Error(i.logger).Log("msg", "failed to process job", "err", err)
if processErr := i.processJob(ctx, c, job, logger); processErr != nil {
level.Error(i.logger).Log("msg", "failed to process job", "err", processErr)
err = errors.Wrap(processErr, "processing job")
completion.Success = false
}
@ -301,7 +302,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
return true, err
}
func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (err error) {
level.Debug(logger).Log("msg", "beginning job")
start := time.Now()
@ -327,7 +328,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, c, job.Partition(), job.Offsets(), inputCh)
return err
return errors.Wrap(err, "loading records")
},
func(ctx context.Context) error {
level.Debug(logger).Log(
@ -365,7 +366,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
cut, err := appender.Append(ctx, input)
if err != nil {
level.Error(logger).Log("msg", "failed to append records", "err", err)
return err
return errors.Wrap(err, "appending records")
}
for _, chk := range cut {
@ -392,7 +393,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
// once we're done appending, cut all remaining chunks.
chks, err := appender.CutRemainingChunks(ctx)
if err != nil {
return err
return errors.Wrap(err, "cutting remaining chunks")
}
for _, chk := range chks {
@ -429,7 +430,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
if err != nil {
level.Error(logger).Log("msg", "failed to flush chunk", "err", err)
i.metrics.chunksFlushFailures.Inc()
return
return res, errors.Wrap(err, "flushing chunk")
}
appender.reportFlushedChunkStatistics(chk)
@ -445,6 +446,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta})
if err != nil {
level.Error(logger).Log("msg", "failed to append chunk to index", "err", err)
return res, errors.Wrap(err, "appending chunk to index")
}
return
@ -463,7 +465,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
"err", err,
)
if err != nil {
return 0, err
return errors.Wrap(err, "running pipeline")
}
var (
@ -474,7 +476,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
built, err := indexer.create(ctx, nodeName, tableRanges)
if err != nil {
level.Error(logger).Log("msg", "failed to build index", "err", err)
return 0, err
return errors.Wrap(err, "building index")
}
u := newUploader(i.objStore)
@ -486,7 +488,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
return
return res, errors.Wrap(err, "uploading tsdb")
}
level.Debug(logger).Log(
@ -495,14 +497,10 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
)
return
}); err != nil {
return 0, err
return errors.Wrap(err, "running pipeline")
}
}
if lastOffset <= job.Offsets().Min {
return lastOffset, nil
}
// log success
level.Info(logger).Log(
"msg", "successfully processed job",
@ -511,10 +509,10 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
"records", lastOffset-job.Offsets().Min,
)
return lastOffset, nil
return nil
}
func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (lastConsumedOffset int64, err error) {
// Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range.
// This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision.
c.AddConsumePartitions(map[string]map[int32]kgo.Offset{
@ -524,22 +522,48 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
i.kafkaCfg.Topic: {partitionID},
})
lastConsumedOffset = offsets.Min - 1
var (
lastConsumedOffset = offsets.Min - 1
lastSeenOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
boff = backoff.New(ctx, i.cfg.Backoff)
consecutiveTimeouts = 0
maxTimeouts = 3
)
for lastSeenOffset < offsets.Max && boff.Ongoing() {
// since offsets.Max is exclusive, can point to an offset that doesn't exist,
// so we only poll until we reach the end of the records we need to process (offsets.Max-1).
// this prevents us from polling indefinitely for records that don't exist.
for lastConsumedOffset < offsets.Max-1 && boff.Ongoing() {
if consecutiveTimeouts >= maxTimeouts {
return lastConsumedOffset, fmt.Errorf("exceeded maximum consecutive timeouts (%d) while polling records", maxTimeouts)
}
if err := context.Cause(ctx); err != nil {
return 0, err
return lastConsumedOffset, err
}
fs := c.PollRecords(ctx, int(offsets.Max-lastConsumedOffset))
// TODO: better error handling for non-retrybale errors
// we don't have to iterate over all errors since we only fetch a single partition
// Add timeout for each poll operation
pollCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
fs := c.PollRecords(pollCtx, int(offsets.Max-lastConsumedOffset))
cancel()
if err := fs.Err(); err != nil {
level.Error(i.logger).Log("msg", "failed to poll records", "err", err)
if errors.Is(err, context.DeadlineExceeded) {
level.Warn(i.logger).Log(
"msg", "timeout polling records",
"partition", partitionID,
"last_offset", lastConsumedOffset,
"target_offset", offsets.Max,
)
boff.Wait()
consecutiveTimeouts++
continue
}
level.Error(i.logger).Log(
"msg", "failed to poll records",
"err", err,
"partition", partitionID,
"last_offset", lastConsumedOffset,
)
boff.Wait()
continue
}
@ -551,11 +575,11 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
// Reset backoff on successful poll
boff.Reset()
consecutiveTimeouts = 0
converted := make([]AppendInput, 0, fs.NumRecords())
for iter := fs.RecordIter(); !iter.Done(); {
record := iter.Next()
lastSeenOffset = record.Offset
if record.Offset >= offsets.Max {
level.Debug(i.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
break
@ -563,10 +587,11 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
stream, labels, err := i.decoder.Decode(record.Value)
if err != nil {
return 0, fmt.Errorf("failed to decode record: %w", err)
return lastConsumedOffset, errors.Wrap(err, "failed to decode record")
}
lastConsumedOffset = record.Offset
if len(stream.Entries) == 0 {
continue
}
@ -587,7 +612,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partition
select {
case ch <- converted:
case <-ctx.Done():
return 0, ctx.Err()
return lastConsumedOffset, ctx.Err()
}
}
}

@ -58,6 +58,14 @@ func (pq *PriorityQueue[K, V]) Pop() (V, bool) {
return it.value, true
}
func (pq *PriorityQueue[K, V]) Peek() (V, bool) {
if pq.Len() == 0 {
var zero V
return zero, false
}
return pq.h.heap[0].value, true
}
// Lookup returns the item with the given key if it exists.
func (pq *PriorityQueue[K, V]) Lookup(k K) (V, bool) {
if it, ok := pq.m[k]; ok {

@ -21,26 +21,6 @@ const (
defaultCompletedJobsCapacity = 100
)
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
type JobWithMetadata struct {
*types.Job
Priority int
Status types.JobStatus
StartTime time.Time
UpdateTime time.Time
}
// NewJobWithMetadata creates a new JobWithMetadata instance
func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata {
return &JobWithMetadata{
Job: job,
Priority: priority,
Status: types.JobStatusPending,
UpdateTime: time.Now(),
}
}
type jobQueueMetrics struct {
pending prometheus.Gauge
inProgress prometheus.Gauge
@ -64,6 +44,26 @@ func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics {
}
}
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
type JobWithMetadata struct {
*types.Job
Priority int
Status types.JobStatus
StartTime time.Time
UpdateTime time.Time
}
// NewJobWithMetadata creates a new JobWithMetadata instance
func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata {
return &JobWithMetadata{
Job: job,
Priority: priority,
Status: types.JobStatusPending,
UpdateTime: time.Now(),
}
}
type JobQueueConfig struct {
LeaseExpiryCheckInterval time.Duration `yaml:"lease_expiry_check_interval"`
LeaseDuration time.Duration `yaml:"lease_duration"`
@ -74,7 +74,7 @@ func (cfg *JobQueueConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LeaseDuration, "jobqueue.lease-duration", 10*time.Minute, "Duration after which a job lease is considered expired if the scheduler receives no updates from builders about the job. Expired jobs are re-enqueued")
}
// JobQueue manages the queue of pending jobs and tracks their state.
// JobQueue is a thread-safe implementation of a job queue with state tracking
type JobQueue struct {
cfg JobQueueConfig
@ -88,25 +88,21 @@ type JobQueue struct {
metrics *jobQueueMetrics
}
// NewJobQueue creates a new job queue instance
// NewJobQueue creates a new JobQueue instance
func NewJobQueue(cfg JobQueueConfig, logger log.Logger, reg prometheus.Registerer) *JobQueue {
return &JobQueue{
cfg: cfg,
logger: logger,
pending: NewPriorityQueue(
func(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority // Higher priority first
},
func(j *JobWithMetadata) string { return j.ID() },
),
cfg: cfg,
pending: NewPriorityQueue(priorityComparator, jobIDExtractor),
inProgress: make(map[string]*JobWithMetadata),
completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
logger: logger,
metrics: newJobQueueMetrics(reg),
}
}
// RunLeaseExpiryChecker periodically checks for expired job leases and requeues them
func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) {
ticker := time.NewTicker(q.cfg.LeaseExpiryCheckInterval)
defer ticker.Stop()
@ -124,227 +120,211 @@ func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) {
}
}
// requeueExpiredJobs checks for jobs that have exceeded their lease duration and requeues them
func (q *JobQueue) requeueExpiredJobs() error {
// First collect expired jobs while holding the lock
q.mu.Lock()
defer q.mu.Unlock()
var multiErr error
var expiredJobs []*JobWithMetadata
for id, job := range q.inProgress {
if time.Since(job.UpdateTime) > q.cfg.LeaseDuration {
level.Warn(q.logger).Log("msg", "job lease expired. requeuing", "job", id, "update_time", job.UpdateTime, "now", time.Now())
// complete the job with expired status and re-enqueue
delete(q.inProgress, id)
q.metrics.inProgress.Dec()
level.Warn(q.logger).Log("msg", "job lease expired, will requeue", "job", id, "update_time", job.UpdateTime, "now", time.Now())
expiredJobs = append(expiredJobs, job)
}
}
q.mu.Unlock()
job.Status = types.JobStatusExpired
q.addToCompletedBuffer(job)
// Then requeue them without holding the lock
var multiErr error
for _, job := range expiredJobs {
// First try to transition from in-progress to expired
ok, err := q.TransitionState(job.ID(), types.JobStatusInProgress, types.JobStatusExpired)
if err != nil {
level.Error(q.logger).Log("msg", "failed to mark job as expired", "job", job.ID(), "err", err)
multiErr = errors.Join(multiErr, fmt.Errorf("failed to mark job %s as expired: %w", job.ID(), err))
continue
}
if !ok {
// Job is no longer in progress, someone else must have handled it
level.Debug(q.logger).Log("msg", "job no longer in progress, skipping expiry", "job", job.ID())
continue
}
if err := q.enqueueLockLess(job.Job, job.Priority); err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", id, "err", err)
multiErr = errors.Join(multiErr, err)
}
// Then re-enqueue it
_, _, err = q.TransitionAny(job.ID(), types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job.Job, job.Priority), nil
})
if err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", job.ID(), "err", err)
multiErr = errors.Join(multiErr, fmt.Errorf("failed to requeue expired job %s: %w", job.ID(), err))
}
}
return multiErr
}
// Exists checks if a job exists in any state and returns its status
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
x, ok := q.existsLockLess(job.ID())
if !ok {
return types.JobStatusUnknown, false
}
return x.Status, ok
// priorityComparator compares two jobs by priority (higher priority first)
func priorityComparator(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority
}
func (q *JobQueue) existsLockLess(id string) (*JobWithMetadata, bool) {
status, ok := q.statusMap[id]
if !ok {
return nil, false
}
switch status {
case types.JobStatusPending:
return q.pending.Lookup(id)
case types.JobStatusInProgress:
res, ok := q.inProgress[id]
return res, ok
case types.JobStatusComplete:
return q.completed.Lookup(func(jwm *JobWithMetadata) bool {
return jwm.ID() == id
})
default:
return nil, false
}
// jobIDExtractor extracts the job ID from a JobWithMetadata
func jobIDExtractor(j *JobWithMetadata) string {
return j.ID()
}
// Enqueue adds a job to the pending queue with the given priority
func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
// TransitionState attempts to transition a job from one specific state to another
func (q *JobQueue) TransitionState(jobID string, from, to types.JobStatus) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()
return q.enqueueLockLess(job, priority)
}
currentStatus, exists := q.statusMap[jobID]
if !exists {
return false, fmt.Errorf("job %s not found", jobID)
}
func (q *JobQueue) enqueueLockLess(job *types.Job, priority int) error {
// Check if job already exists
if status, exists := q.statusMap[job.ID()]; exists && status != types.JobStatusExpired {
return fmt.Errorf("job %s already exists with status %v", job.ID(), status)
if currentStatus != from {
return false, fmt.Errorf("job %s is in state %s, not %s", jobID, currentStatus, from)
}
jobMeta := NewJobWithMetadata(job, priority)
q.pending.Push(jobMeta)
q.statusMap[job.ID()] = types.JobStatusPending
q.metrics.pending.Inc()
return nil
return q.transitionLockLess(jobID, to)
}
// Dequeue removes and returns the highest priority job from the pending queue
func (q *JobQueue) Dequeue() (*types.Job, bool) {
// TransitionAny transitions a job from any state to the specified state
func (q *JobQueue) TransitionAny(jobID string, to types.JobStatus, createFn func() (*JobWithMetadata, error)) (prevStatus types.JobStatus, found bool, err error) {
q.mu.Lock()
defer q.mu.Unlock()
jobMeta, ok := q.pending.Pop()
if !ok {
return nil, false
}
q.metrics.pending.Dec()
currentStatus, exists := q.statusMap[jobID]
// If the job isn't found or has already finished, create a new job
if finished := currentStatus.IsFinished(); !exists || finished {
// exception:
// we're just moving one finished type to another; no need to re-enqueue
if finished && to.IsFinished() {
q.statusMap[jobID] = to
if j, found := q.completed.Lookup(
func(jwm *JobWithMetadata) bool {
return jwm.ID() == jobID
},
); found {
j.Status = to
j.UpdateTime = time.Now()
}
return currentStatus, true, nil
}
// Update metadata for in-progress state
jobMeta.Status = types.JobStatusInProgress
jobMeta.StartTime = time.Now()
jobMeta.UpdateTime = jobMeta.StartTime
if createFn == nil {
return types.JobStatusUnknown, false, fmt.Errorf("job %s not found and no creation function provided", jobID)
}
q.inProgress[jobMeta.ID()] = jobMeta
q.statusMap[jobMeta.ID()] = types.JobStatusInProgress
q.metrics.inProgress.Inc()
if finished {
level.Debug(q.logger).Log("msg", "creating a copy of already-completed job", "id", jobID, "from", currentStatus, "to", to)
}
return jobMeta.Job, true
}
job, err := createFn()
if err != nil {
return types.JobStatusUnknown, false, fmt.Errorf("failed to create job %s: %w", jobID, err)
}
// GetInProgressJob retrieves a job that is currently being processed
func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
// temporarily mark as pending so we can transition it to the target state
q.statusMap[jobID] = types.JobStatusPending
q.pending.Push(job)
q.metrics.pending.Inc()
level.Debug(q.logger).Log("msg", "created new job", "id", jobID, "status", types.JobStatusPending)
if jobMeta, ok := q.inProgress[id]; ok {
return jobMeta.Job, jobMeta.StartTime, true
if _, err := q.transitionLockLess(jobID, to); err != nil {
return types.JobStatusUnknown, false, err
}
return types.JobStatusUnknown, false, nil
}
return nil, time.Time{}, false
_, err = q.transitionLockLess(jobID, to)
return currentStatus, true, err
}
// RemoveInProgress removes a job from the in-progress map
func (q *JobQueue) RemoveInProgress(id string) {
q.mu.Lock()
defer q.mu.Unlock()
// transitionLockLess performs the actual state transition (must be called with lock held)
func (q *JobQueue) transitionLockLess(jobID string, to types.JobStatus) (bool, error) {
from := q.statusMap[jobID]
if from == to {
return false, nil
}
delete(q.inProgress, id)
q.metrics.inProgress.Dec()
}
var job *JobWithMetadata
// MarkComplete moves a job from in-progress to completed with the given status
func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
q.mu.Lock()
defer q.mu.Unlock()
// Remove from current state
switch from {
case types.JobStatusPending:
if j, exists := q.pending.Remove(jobID); exists {
job = j
q.metrics.pending.Dec()
}
case types.JobStatusInProgress:
if j, exists := q.inProgress[jobID]; exists {
job = j
delete(q.inProgress, jobID)
q.metrics.inProgress.Dec()
}
}
jobMeta, ok := q.existsLockLess(id)
if !ok {
level.Error(q.logger).Log("msg", "failed to mark job as complete", "job", id, "status", status)
return
if job == nil {
return false, fmt.Errorf("job %s not found in its supposed state %s", jobID, from)
}
switch jobMeta.Status {
case types.JobStatusInProgress:
// update & remove from in progress
delete(q.inProgress, id)
q.metrics.inProgress.Dec()
// Add to new state
job.Status = to
job.UpdateTime = time.Now()
q.statusMap[jobID] = to
switch to {
case types.JobStatusPending:
_, ok := q.pending.Remove(id)
if !ok {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id)
}
q.metrics.pending.Dec()
case types.JobStatusComplete:
level.Info(q.logger).Log("msg", "job is already complete, ignoring", "job", id)
return
q.pending.Push(job)
q.metrics.pending.Inc()
case types.JobStatusInProgress:
q.inProgress[jobID] = job
q.metrics.inProgress.Inc()
job.StartTime = job.UpdateTime
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired:
q.completed.Push(job)
q.metrics.completed.WithLabelValues(to.String()).Inc()
delete(q.statusMap, jobID) // remove from status map so we don't grow indefinitely
default:
level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status)
return
return false, fmt.Errorf("invalid target state: %s", to)
}
jobMeta.Status = status
jobMeta.UpdateTime = time.Now()
q.addToCompletedBuffer(jobMeta)
level.Debug(q.logger).Log("msg", "transitioned job state", "id", jobID, "from", from, "to", to)
return true, nil
}
// add it to the completed buffer, removing any evicted job from the statusMap
func (q *JobQueue) addToCompletedBuffer(jobMeta *JobWithMetadata) {
removal, evicted := q.completed.Push(jobMeta)
if evicted {
delete(q.statusMap, removal.ID())
}
// Exists checks if a job exists and returns its current status
func (q *JobQueue) Exists(jobID string) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
q.statusMap[jobMeta.ID()] = jobMeta.Status
q.metrics.completed.WithLabelValues(jobMeta.Status.String()).Inc()
status, exists := q.statusMap[jobID]
return status, exists
}
// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress
func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
// Dequeue removes and returns the highest priority pending job
func (q *JobQueue) Dequeue() (*types.Job, bool) {
q.mu.Lock()
defer q.mu.Unlock()
// Helper function to create a new job
registerInProgress := func() {
// Job does not exist; add it as in-progress
now := time.Now()
jobMeta := NewJobWithMetadata(job, DefaultPriority)
jobMeta.StartTime = now
jobMeta.UpdateTime = now
jobMeta.Status = types.JobStatusInProgress
q.inProgress[jobID] = jobMeta
q.statusMap[jobID] = types.JobStatusInProgress
q.metrics.inProgress.Inc()
}
jobMeta, ok := q.existsLockLess(jobID)
job, ok := q.pending.Peek()
if !ok {
registerInProgress()
return
return nil, false
}
switch jobMeta.Status {
case types.JobStatusPending:
// Job already pending, move to in-progress
_, ok := q.pending.Remove(jobID)
if !ok {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID)
}
jobMeta.Status = types.JobStatusInProgress
q.metrics.pending.Dec()
q.metrics.inProgress.Inc()
case types.JobStatusInProgress:
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired:
// Job already completed, re-enqueue a new one
level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status)
registerInProgress()
return
default:
registerInProgress()
return
_, err := q.transitionLockLess(job.ID(), types.JobStatusInProgress)
if err != nil {
level.Error(q.logger).Log("msg", "failed to transition dequeued job to in progress", "id", job.ID(), "err", err)
return nil, false
}
jobMeta.UpdateTime = time.Now()
q.inProgress[jobID] = jobMeta
q.statusMap[jobID] = types.JobStatusInProgress
return job.Job, true
}
// ListPendingJobs returns a list of all pending jobs
func (q *JobQueue) ListPendingJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()
@ -352,9 +332,9 @@ func (q *JobQueue) ListPendingJobs() []JobWithMetadata {
// return copies of the jobs since they can change after the lock is released
jobs := make([]JobWithMetadata, 0, q.pending.Len())
for _, j := range q.pending.List() {
cpy := *j.Job
jobs = append(jobs, JobWithMetadata{
// Job is immutable, no need to make a copy
Job: j.Job,
Job: &cpy, // force copy
Priority: j.Priority,
Status: j.Status,
StartTime: j.StartTime,
@ -365,6 +345,7 @@ func (q *JobQueue) ListPendingJobs() []JobWithMetadata {
return jobs
}
// ListInProgressJobs returns a list of all in-progress jobs
func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()
@ -372,9 +353,9 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
// return copies of the jobs since they can change after the lock is released
jobs := make([]JobWithMetadata, 0, len(q.inProgress))
for _, j := range q.inProgress {
cpy := *j.Job
jobs = append(jobs, JobWithMetadata{
// Job is immutable, no need to make a copy
Job: j.Job,
Job: &cpy, // force copy
Priority: j.Priority,
Status: j.Status,
StartTime: j.StartTime,
@ -384,14 +365,54 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
return jobs
}
// ListCompletedJobs returns a list of completed jobs
func (q *JobQueue) ListCompletedJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()
jobs := make([]JobWithMetadata, 0, q.completed.Len())
q.completed.Range(func(job *JobWithMetadata) bool {
jobs = append(jobs, *job)
cpy := *job.Job
jobs = append(jobs, JobWithMetadata{
Job: &cpy, // force copy
Priority: job.Priority,
Status: job.Status,
StartTime: job.StartTime,
UpdateTime: job.UpdateTime,
})
return true
})
return jobs
}
// UpdatePriority updates the priority of a pending job. If the job is not pending,
// returns false to indicate the update was not performed.
func (q *JobQueue) UpdatePriority(id string, priority int) bool {
q.mu.Lock()
defer q.mu.Unlock()
// Check if job is still pending
if job, ok := q.pending.Lookup(id); ok {
// nit: we're technically already updating the prio via reference,
// but that's fine -- we may refactor this eventually to have 3 generic types: (key, value, priority) where value implements a `Priority() T` method.
job.Priority = priority
return q.pending.UpdatePriority(id, job)
}
// Job is no longer pending (might be in progress, completed, etc)
return false
}
// Ping updates the last-updated timestamp of a job and returns whether it was found.
// This is useful for keeping jobs alive and preventing lease expiry.
func (q *JobQueue) Ping(id string) bool {
q.mu.Lock()
defer q.mu.Unlock()
if job, ok := q.inProgress[id]; ok {
job.UpdateTime = time.Now()
return true
}
return false
}

@ -5,267 +5,208 @@ import (
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)
var testQueueCfg = JobQueueConfig{}
func TestJobQueue_SyncJob(t *testing.T) {
t.Run("non-existent to in-progress", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
jobID := job.ID()
func newTestQueue() *JobQueue {
return NewJobQueue(
JobQueueConfig{
LeaseDuration: 10 * time.Minute,
LeaseExpiryCheckInterval: time.Minute,
},
log.NewNopLogger(),
prometheus.NewRegistry(),
)
}
beforeSync := time.Now()
q.SyncJob(jobID, job)
afterSync := time.Now()
func TestJobQueue_TransitionState(t *testing.T) {
q := newTestQueue()
offsets := types.Offsets{Min: 0, Max: 100}
job := types.NewJob(1, offsets)
jobID := job.ID()
// Verify job is in in-progress map
jobMeta, ok := q.inProgress[jobID]
require.True(t, ok, "job should be in in-progress map")
require.Equal(t, types.JobStatusInProgress, jobMeta.Status)
require.True(t, jobMeta.StartTime.After(beforeSync) || jobMeta.StartTime.Equal(beforeSync))
require.True(t, jobMeta.StartTime.Before(afterSync) || jobMeta.StartTime.Equal(afterSync))
// Create initial job
prevState, existed, err := q.TransitionAny(jobID, types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, 1), nil
})
require.NoError(t, err)
require.False(t, existed)
require.Equal(t, types.JobStatusUnknown, prevState)
t.Run("pending to in-progress", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
// Test valid transition
ok, err := q.TransitionState(jobID, types.JobStatusPending, types.JobStatusInProgress)
require.NoError(t, err)
require.True(t, ok)
// Start with pending job
err := q.Enqueue(job, DefaultPriority)
require.NoError(t, err)
// Test invalid transition (wrong 'from' state)
ok, err = q.TransitionState(jobID, types.JobStatusPending, types.JobStatusComplete)
require.Error(t, err)
require.False(t, ok)
beforeSync := time.Now()
q.SyncJob(job.ID(), job)
afterSync := time.Now()
// Test non-existent job
ok, err = q.TransitionState("non-existent", types.JobStatusPending, types.JobStatusInProgress)
require.Error(t, err)
require.False(t, ok)
}
// Verify job moved from pending to in-progress
_, ok := q.pending.Lookup(job.ID())
require.False(t, ok, "job should not be in pending queue")
func TestJobQueue_TransitionAny(t *testing.T) {
q := newTestQueue()
offsets := types.Offsets{Min: 0, Max: 100}
job := types.NewJob(1, offsets)
jobID := job.ID()
jobMeta, ok := q.inProgress[job.ID()]
require.True(t, ok, "job should be in in-progress map")
require.Equal(t, types.JobStatusInProgress, jobMeta.Status)
require.True(t, jobMeta.UpdateTime.After(beforeSync) || jobMeta.UpdateTime.Equal(beforeSync))
require.True(t, jobMeta.UpdateTime.Before(afterSync) || jobMeta.UpdateTime.Equal(afterSync))
// Test creation of new job
prevState, existed, err := q.TransitionAny(jobID, types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, 1), nil
})
require.NoError(t, err)
require.False(t, existed)
require.Equal(t, types.JobStatusUnknown, prevState)
t.Run("already in-progress", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
// First sync to put in in-progress
q.SyncJob(job.ID(), job)
firstUpdate := q.inProgress[job.ID()].UpdateTime
// Test transition of existing job
prevState, existed, err = q.TransitionAny(jobID, types.JobStatusInProgress, nil)
require.NoError(t, err)
require.True(t, existed)
require.Equal(t, types.JobStatusPending, prevState)
time.Sleep(time.Millisecond) // Ensure time difference
beforeSecondSync := time.Now()
q.SyncJob(job.ID(), job)
afterSecondSync := time.Now()
// Test transition with nil createFn for non-existent job
prevState, existed, err = q.TransitionAny("non-existent", types.JobStatusPending, nil)
require.Error(t, err)
require.False(t, existed)
require.Equal(t, types.JobStatusUnknown, prevState)
jobMeta := q.inProgress[job.ID()]
require.True(t, jobMeta.UpdateTime.After(firstUpdate), "UpdateTime should be updated")
require.True(t, jobMeta.UpdateTime.After(beforeSecondSync) || jobMeta.UpdateTime.Equal(beforeSecondSync))
require.True(t, jobMeta.UpdateTime.Before(afterSecondSync) || jobMeta.UpdateTime.Equal(afterSecondSync))
})
// Verify final status
status, exists := q.Exists(jobID)
require.True(t, exists)
require.Equal(t, types.JobStatusInProgress, status)
}
func TestJobQueue_MarkComplete(t *testing.T) {
t.Run("in-progress to complete", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
// Start with in-progress job
q.SyncJob(job.ID(), job)
beforeComplete := time.Now()
q.MarkComplete(job.ID(), types.JobStatusComplete)
afterComplete := time.Now()
// Verify job moved to completed buffer
var foundJob *JobWithMetadata
q.completed.Lookup(func(j *JobWithMetadata) bool {
if j.ID() == job.ID() {
foundJob = j
return true
}
return false
})
require.NotNil(t, foundJob, "job should be in completed buffer")
require.Equal(t, types.JobStatusComplete, foundJob.Status)
require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete))
require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete))
// Verify removed from in-progress
_, ok := q.inProgress[job.ID()]
require.False(t, ok, "job should not be in in-progress map")
})
t.Run("pending to complete", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
// Start with pending job
err := q.Enqueue(job, DefaultPriority)
require.NoError(t, err)
q.MarkComplete(job.ID(), types.JobStatusComplete)
// Verify job not in pending
_, ok := q.pending.Lookup(job.ID())
require.False(t, ok, "job should not be in pending queue")
func TestJobQueue_Dequeue(t *testing.T) {
q := newTestQueue()
// Add jobs with different priorities
jobs := []struct {
partition int32
priority int
}{
{1, 1},
{2, 3},
{3, 2},
}
// Verify job in completed buffer
var foundJob *JobWithMetadata
q.completed.Lookup(func(j *JobWithMetadata) bool {
if j.ID() == job.ID() {
foundJob = j
return true
}
return false
var jobIDs []string
for _, j := range jobs {
job := types.NewJob(j.partition, types.Offsets{Min: 0, Max: 100})
jobIDs = append(jobIDs, job.ID())
_, _, err := q.TransitionAny(job.ID(), types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, j.priority), nil
})
require.NotNil(t, foundJob, "job should be in completed buffer")
require.Equal(t, types.JobStatusComplete, foundJob.Status)
})
t.Run("non-existent job", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger
require.NoError(t, err)
}
q.MarkComplete("non-existent", types.JobStatusComplete)
// Should log error but not panic
})
// Dequeue should return highest priority job first
job, ok := q.Dequeue()
require.True(t, ok)
require.Equal(t, jobIDs[1], job.ID()) // Priority 3
t.Run("already completed job", func(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger
job, ok = q.Dequeue()
require.True(t, ok)
require.Equal(t, jobIDs[2], job.ID()) // Priority 2
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
q.SyncJob(job.ID(), job)
q.MarkComplete(job.ID(), types.JobStatusComplete)
job, ok = q.Dequeue()
require.True(t, ok)
require.Equal(t, jobIDs[0], job.ID()) // Priority 1
// Try to complete again
q.MarkComplete(job.ID(), types.JobStatusComplete)
// Should log error but not panic
})
// Queue should be empty now
job, ok = q.Dequeue()
require.False(t, ok)
require.Nil(t, job)
}
func TestJobQueue_Enqueue(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
beforeComplete := time.Now()
err := q.Enqueue(job, 1)
afterComplete := time.Now()
require.NoError(t, err)
status, ok := q.Exists(job)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)
// Verify job in pending queue
foundJob, ok := q.pending.Lookup(job.ID())
require.True(t, ok, "job should be in pending queue")
require.Equal(t, job, foundJob.Job)
require.Equal(t, 1, foundJob.Priority)
require.True(t, foundJob.StartTime.IsZero())
require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete))
require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete))
// allow enqueueing of job with same ID if expired
job2 := types.NewJob(2, types.Offsets{Min: 100, Max: 200})
q.statusMap[job2.ID()] = types.JobStatusExpired
func TestJobQueue_Lists(t *testing.T) {
q := newTestQueue()
// Add jobs in different states
jobStates := map[int32]types.JobStatus{
1: types.JobStatusPending,
2: types.JobStatusPending,
3: types.JobStatusInProgress,
4: types.JobStatusInProgress,
5: types.JobStatusComplete,
6: types.JobStatusFailed,
}
err = q.Enqueue(job2, 2)
require.NoError(t, err)
for partition, status := range jobStates {
job := types.NewJob(partition, types.Offsets{Min: 0, Max: 100})
_, _, err := q.TransitionAny(job.ID(), types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, 1), nil
})
require.NoError(t, err)
status, ok = q.Exists(job2)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)
if status != types.JobStatusPending {
_, _, err = q.TransitionAny(job.ID(), status, nil)
require.NoError(t, err)
}
}
// Verify job2 in pending queue
foundJob, ok = q.pending.Lookup(job2.ID())
require.True(t, ok, "job2 should be in pending queue")
require.Equal(t, job2, foundJob.Job)
require.Equal(t, 2, foundJob.Priority)
// Test ListPendingJobs
pending := q.ListPendingJobs()
require.Len(t, pending, 2)
for _, j := range pending {
require.Equal(t, types.JobStatusPending, j.Status)
}
// do not allow enqueueing of job with same ID if not expired
job3 := types.NewJob(3, types.Offsets{Min: 120, Max: 230})
q.statusMap[job3.ID()] = types.JobStatusInProgress
// Test ListInProgressJobs
inProgress := q.ListInProgressJobs()
require.Len(t, inProgress, 2)
for _, j := range inProgress {
require.Equal(t, types.JobStatusInProgress, j.Status)
}
err = q.Enqueue(job3, DefaultPriority)
require.Error(t, err)
// Test ListCompletedJobs
completed := q.ListCompletedJobs()
require.Len(t, completed, 2) // completed and failed
for _, j := range completed {
require.Contains(t, []types.JobStatus{types.JobStatusComplete, types.JobStatusFailed}, j.Status)
}
}
func TestJobQueue_RequeueExpiredJobs(t *testing.T) {
q := NewJobQueue(JobQueueConfig{
LeaseDuration: 5 * time.Minute,
}, log.NewNopLogger(), nil)
job1 := &JobWithMetadata{
Job: types.NewJob(1, types.Offsets{Min: 100, Max: 200}),
Priority: 1,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-time.Minute),
}
// expired job
job2 := &JobWithMetadata{
Job: types.NewJob(2, types.Offsets{Min: 300, Max: 400}),
Priority: 2,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-6 * time.Minute),
func TestJobQueue_LeaseExpiry(t *testing.T) {
cfg := JobQueueConfig{
LeaseDuration: 100 * time.Millisecond,
LeaseExpiryCheckInterval: 10 * time.Millisecond,
}
q := NewJobQueue(cfg, log.NewNopLogger(), prometheus.NewRegistry())
q.inProgress[job1.ID()] = job1
q.inProgress[job2.ID()] = job2
q.statusMap[job1.ID()] = types.JobStatusInProgress
q.statusMap[job2.ID()] = types.JobStatusInProgress
// Create and start a job
job := types.NewJob(1, types.Offsets{Min: 0, Max: 100})
jobID := job.ID()
beforeRequeue := time.Now()
err := q.requeueExpiredJobs()
_, _, err := q.TransitionAny(jobID, types.JobStatusPending, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, 1), nil
})
require.NoError(t, err)
status, ok := q.statusMap[job1.ID()]
// Move to in progress
dequeued, ok := q.Dequeue()
require.True(t, ok)
require.Equal(t, types.JobStatusInProgress, status)
require.Equal(t, jobID, dequeued.ID())
got, ok := q.inProgress[job1.ID()]
require.True(t, ok)
require.Equal(t, job1, got)
// Wait for lease to expire
time.Sleep(200 * time.Millisecond)
err = q.requeueExpiredJobs()
require.NoError(t, err)
status, ok = q.statusMap[job2.ID()]
require.True(t, ok)
// Verify the job is back in pending
status, exists := q.Exists(jobID)
require.True(t, exists)
require.Equal(t, types.JobStatusPending, status)
got, ok = q.pending.Lookup(job2.ID())
require.True(t, ok)
require.Equal(t, job2.Job, got.Job)
require.Equal(t, types.JobStatusPending, got.Status)
require.Equal(t, job2.Priority, got.Priority)
require.True(t, got.StartTime.IsZero())
require.True(t, got.UpdateTime.After(beforeRequeue) || got.UpdateTime.Equal(beforeRequeue))
require.Equal(t, 1, q.completed.Len())
got, ok = q.completed.Pop()
require.True(t, ok)
job2.Status = types.JobStatusExpired
require.Equal(t, job2, got)
}
// testLogger implements log.Logger for testing
type testLogger struct {
t *testing.T
}
func (l *testLogger) Log(keyvals ...interface{}) error {
l.t.Log(keyvals...)
return nil
// Check that it went through expired state
completed := q.ListCompletedJobs()
require.Len(t, completed, 1)
require.Equal(t, types.JobStatusExpired, completed[0].Status)
}

@ -15,7 +15,6 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/kafka/partition"
@ -97,7 +96,7 @@ type BlockScheduler struct {
}
// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
func NewScheduler(cfg Config, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
// pin the fallback offset at the time of scheduler creation to ensure planner uses the same fallback offset on subsequent runs
// without this, planner would create jobs that are unaligned when the partition has no commits so far.
fallbackOffsetMillis := int64(partition.KafkaStartOffset)
@ -119,7 +118,7 @@ func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetMan
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
queue: NewJobQueue(cfg.JobQueueConfig, logger, r),
fallbackOffsetMillis: fallbackOffsetMillis,
}
@ -133,6 +132,7 @@ func (s *BlockScheduler) running(ctx context.Context) error {
}
go s.queue.RunLeaseExpiryChecker(ctx)
go s.publishLagLoop(ctx)
ticker := time.NewTicker(s.cfg.Interval)
for {
@ -149,14 +149,6 @@ func (s *BlockScheduler) running(ctx context.Context) error {
}
func (s *BlockScheduler) runOnce(ctx context.Context) error {
lag, err := s.offsetManager.GroupLag(ctx, s.fallbackOffsetMillis)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get group lag", "err", err)
return err
}
s.publishLagMetrics(lag)
// TODO(owen-d): parallelize work within a partition
// TODO(owen-d): skip small jobs unless they're stale,
// e.g. a partition which is no longer being written to shouldn't be orphaned
@ -166,58 +158,99 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
}
level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs))
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
for _, job := range jobs {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
added, status, err := s.idempotentEnqueue(job)
level.Info(s.logger).Log(
"msg", "enqueued job",
"added", added,
"status", status.String(),
"err", err,
"partition", job.Job.Partition(),
"num_offsets", job.Offsets().Max-job.Offsets().Min,
)
// if we've either added or encountered an error, move on; we're done this cycle
if added || err != nil {
continue
if err := s.handlePlannedJob(job); err != nil {
level.Error(s.logger).Log("msg", "failed to handle planned job", "err", err)
}
}
// scheduler is aware of incoming job; handling depends on status
switch status {
case types.JobStatusPending:
level.Debug(s.logger).Log(
"msg", "job is pending, updating priority",
"old_priority", job.Priority,
)
s.queue.pending.UpdatePriority(job.Job.ID(), job)
case types.JobStatusInProgress:
level.Debug(s.logger).Log(
"msg", "job is in progress, ignoring",
)
case types.JobStatusComplete:
// shouldn't happen
level.Debug(s.logger).Log(
"msg", "job is complete, ignoring",
)
default:
level.Error(s.logger).Log(
"msg", "job has unknown status, ignoring",
"status", status,
)
return nil
}
func (s *BlockScheduler) publishLagLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lag, err := s.offsetManager.GroupLag(ctx, s.fallbackOffsetMillis)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get group lag for metric publishing", "err", err)
}
s.publishLagMetrics(lag)
case <-ctx.Done():
return
}
}
}
func (s *BlockScheduler) handlePlannedJob(job *JobWithMetadata) error {
logger := log.With(
s.logger,
"job", job.Job.ID(),
"partition", job.Job.Partition(),
"num_offsets", job.Offsets().Max-job.Offsets().Min,
)
status, exists := s.queue.Exists(job.Job.ID())
if !exists {
// New job, enqueue it
_, _, err := s.queue.TransitionAny(job.Job.ID(), types.JobStatusPending, func() (*JobWithMetadata, error) {
return job, nil
})
if err != nil {
level.Error(logger).Log("msg", "failed to enqueue new job", "err", err)
return err
}
level.Info(logger).Log("msg", "enqueued new job")
return nil
}
// Job exists, handle based on current status
switch status {
case types.JobStatusComplete:
// Job already completed successfully, no need to replan
level.Debug(logger).Log("msg", "job already completed successfully, skipping")
case types.JobStatusPending:
// Update priority of pending job
if updated := s.queue.UpdatePriority(job.Job.ID(), job.Priority); !updated {
// Job is no longer pending, skip it for this iteration
level.Debug(logger).Log("msg", "job no longer pending, skipping priority update")
return nil
}
level.Debug(logger).Log("msg", "updated priority of pending job", "new_priority", job.Priority)
case types.JobStatusFailed, types.JobStatusExpired:
// Re-enqueue failed or expired jobs
_, _, err := s.queue.TransitionAny(job.Job.ID(), types.JobStatusPending, func() (*JobWithMetadata, error) {
return job, nil
})
if err != nil {
level.Error(logger).Log("msg", "failed to re-enqueue failed/expired job", "err", err)
return err
}
level.Info(logger).Log("msg", "re-enqueued failed/expired job", "status", status)
case types.JobStatusInProgress:
// Job is being worked on, ignore it
level.Debug(logger).Log("msg", "job is in progress, ignoring")
default:
level.Error(logger).Log("msg", "job has unknown status, ignoring", "status", status)
}
return nil
}
func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) {
for partition, offsets := range lag {
func (s *BlockScheduler) publishLagMetrics(lag map[int32]partition.Lag) {
for partition, l := range lag {
// useful for scaling builders
s.metrics.lag.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Lag))
s.metrics.committedOffset.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(offsets.Commit.At))
s.metrics.lag.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(l.Lag()))
s.metrics.committedOffset.WithLabelValues(strconv.Itoa(int(partition))).Set(float64(l.LastCommittedOffset()))
}
}
@ -231,34 +264,6 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, er
}
}
// if added is true, the job was added to the queue, otherwise status is the current status of the job
func (s *BlockScheduler) idempotentEnqueue(job *JobWithMetadata) (added bool, status types.JobStatus, err error) {
logger := log.With(
s.logger,
"job", job.Job.ID(),
"priority", job.Priority,
)
status, ok := s.queue.Exists(job.Job)
// scheduler is unaware of incoming job; enqueue
if !ok {
level.Debug(logger).Log(
"msg", "job does not exist, enqueueing",
)
// enqueue
if err := s.queue.Enqueue(job.Job, job.Priority); err != nil {
level.Error(logger).Log("msg", "failed to enqueue job", "err", err)
return false, types.JobStatusUnknown, err
}
return true, types.JobStatusPending, nil
}
return false, status, nil
}
func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, success bool) (err error) {
logger := log.With(s.logger, "job", job.ID())
@ -268,8 +273,12 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,
job.Partition(),
job.Offsets().Max-1, // max is exclusive, so commit max-1
); err == nil {
s.queue.MarkComplete(job.ID(), types.JobStatusComplete)
level.Info(logger).Log("msg", "job completed successfully")
if _, _, transitionErr := s.queue.TransitionAny(job.ID(), types.JobStatusComplete, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, DefaultPriority), nil
}); transitionErr != nil {
level.Warn(logger).Log("msg", "failed to mark successful job as complete", "err", transitionErr)
}
// TODO(owen-d): cleaner way to enqueue next job for this partition,
// don't make it part of the response cycle to job completion, etc.
@ -286,29 +295,49 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,
})
if nextJob < len(jobs) && jobs[nextJob].Job.Partition() == job.Partition() {
_, _, _ = s.idempotentEnqueue(jobs[nextJob])
if err := s.handlePlannedJob(jobs[nextJob]); err != nil {
level.Error(logger).Log("msg", "failed to handle subsequent job", "err", err)
}
}
return nil
}
level.Error(logger).Log("msg", "failed to commit offset", "err", err)
}
level.Error(logger).Log("msg", "job failed, re-enqueuing")
s.queue.MarkComplete(job.ID(), types.JobStatusFailed)
s.queue.pending.Push(
NewJobWithMetadata(
job,
DefaultPriority,
),
)
// mark as failed
prev, found, err := s.queue.TransitionAny(job.ID(), types.JobStatusFailed, func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, DefaultPriority), nil
})
if err != nil {
level.Error(logger).Log("msg", "failed to mark job failure", "prev", prev, "found", found, "err", err)
} else {
level.Error(logger).Log("msg", "marked job failure", "prev", prev, "found", found)
}
cpy := *job
if err := s.handlePlannedJob(NewJobWithMetadata(&cpy, DefaultPriority)); err != nil {
level.Error(logger).Log("msg", "failed to handle subsequent job", "err", err)
}
return nil
}
func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error {
s.queue.SyncJob(job.ID(), job)
return nil
_, _, err := s.queue.TransitionAny(
job.ID(),
types.JobStatusInProgress,
func() (*JobWithMetadata, error) {
return NewJobWithMetadata(job, DefaultPriority), nil
},
)
// Update last-updated timestamp
_ = s.queue.Ping(job.ID())
if err != nil {
level.Error(s.logger).Log("msg", "failed to sync job", "job", job.ID(), "err", err)
}
return err
}
func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

@ -7,7 +7,6 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/kafka/partition"
@ -27,7 +26,7 @@ type mockOffsetManager struct {
func (m *mockOffsetManager) Topic() string { return m.topic }
func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup }
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetManager) GroupLag(_ context.Context, _ int64) (map[int32]partition.Lag, error) {
return nil, nil
}
func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) {
@ -41,12 +40,14 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
}
func newTestEnv(builderID string) (*testEnv, error) {
queue := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
mockOffsetMgr := &mockOffsetManager{
topic: "test-topic",
consumerGroup: "test-group",
}
scheduler, err := NewScheduler(Config{Strategy: RecordCountStrategy}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
scheduler, err := NewScheduler(Config{
Strategy: RecordCountStrategy,
JobQueueConfig: JobQueueConfig{},
}, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
if err != nil {
return nil, err
}
@ -55,11 +56,11 @@ func newTestEnv(builderID string) (*testEnv, error) {
builder := NewWorker(builderID, transport)
return &testEnv{
queue: queue,
queue: scheduler.queue,
scheduler: scheduler,
transport: transport,
builder: builder,
}, err
}, nil
}
func TestScheduleAndProcessJob(t *testing.T) {
@ -72,7 +73,7 @@ func TestScheduleAndProcessJob(t *testing.T) {
// Create and enqueue a test job
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
err = env.queue.Enqueue(job, 100)
err = env.scheduler.handlePlannedJob(NewJobWithMetadata(job, 100))
if err != nil {
t.Fatalf("failed to enqueue job: %v", err)
}
@ -137,11 +138,11 @@ func TestMultipleBuilders(t *testing.T) {
job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400})
// Enqueue jobs
err = env1.queue.Enqueue(job1, 100)
err = env1.scheduler.handlePlannedJob(NewJobWithMetadata(job1, 100))
if err != nil {
t.Fatalf("failed to enqueue job1: %v", err)
}
err = env1.queue.Enqueue(job2, 100)
err = env1.scheduler.handlePlannedJob(NewJobWithMetadata(job2, 100))
if err != nil {
t.Fatalf("failed to enqueue job2: %v", err)
}

@ -8,13 +8,15 @@ import (
"slices"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/dustin/go-humanize"
)
//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() },
"offsetsLen": func(min, max int64) int64 { return max - min },
"humanize": humanize.Comma,
}).Parse(defaultPageContent))
type jobQueue interface {
@ -23,10 +25,6 @@ type jobQueue interface {
ListCompletedJobs() []JobWithMetadata
}
type offsetReader interface {
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
}
type partitionInfo struct {
Partition int32
Lag int64
@ -36,11 +34,11 @@ type partitionInfo struct {
type statusPageHandler struct {
jobQueue jobQueue
offsetReader offsetReader
offsetReader OffsetReader
fallbackOffsetMillis int64
}
func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, fallbackOffsetMillis int64) *statusPageHandler {
func newStatusPageHandler(jobQueue jobQueue, offsetReader OffsetReader, fallbackOffsetMillis int64) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, fallbackOffsetMillis: fallbackOffsetMillis}
}
@ -74,16 +72,19 @@ func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
CompletedJobs: h.jobQueue.ListCompletedJobs(),
}
for _, partitionOffset := range offsets {
// only include partitions having lag
if partitionOffset.Lag > 0 {
data.PartitionInfo = append(data.PartitionInfo, partitionInfo{
Partition: partitionOffset.Partition,
Lag: partitionOffset.Lag,
EndOffset: partitionOffset.End.Offset,
CommittedOffset: partitionOffset.Commit.At,
})
for partition, l := range offsets {
// only include partitions having lag that are in retention
lag := l.Lag()
if lag <= 0 {
continue
}
data.PartitionInfo = append(data.PartitionInfo, partitionInfo{
Partition: partition,
Lag: lag,
EndOffset: l.NextAvailableOffset(),
CommittedOffset: l.LastCommittedOffset(),
})
}
slices.SortFunc(data.PartitionInfo, func(a, b partitionInfo) int {
return int(a.Partition - b.Partition)

@ -59,7 +59,7 @@ func TestPreview(t *testing.T) {
},
}
handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
handler := newStatusPageHandler(mockLister, mockReader, int64(time.Hour/time.Millisecond))
// Start local server
server := httptest.NewServer(handler)

@ -7,9 +7,8 @@ import (
"testing"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/kafka/partition"
)
type mockQueueLister struct {
@ -47,25 +46,25 @@ func TestStatusPageHandler_ServeHTTP(t *testing.T) {
}
mockReader := &mockOffsetReader{
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Lag: 10,
Partition: 3,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 90},
},
1: {
Lag: 0,
Partition: 1,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 100},
},
2: {
Lag: 233,
Partition: 2,
End: kadm.ListedOffset{Offset: 333},
Commit: kadm.Offset{At: 100},
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
90, // startOffset (committed offset)
100, // endOffset
90, // committedOffset
10, // rawLag (matches original Lag field)
),
1: partition.NewLag(
100, // startOffset (committed offset)
100, // endOffset
100, // committedOffset
0, // rawLag (matches original Lag field)
),
2: partition.NewLag(
100, // startOffset (committed offset)
333, // endOffset
100, // committedOffset
233, // rawLag (matches original Lag field)
),
},
}

@ -6,14 +6,14 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/kafka/partition"
)
// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
GroupLag(context.Context, int64) (map[int32]kadm.GroupMemberLag, error)
GroupLag(context.Context, int64) (map[int32]partition.Lag, error)
}
type Planner interface {
@ -59,20 +59,15 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int,
}
jobs := make([]*JobWithMetadata, 0, len(offsets))
for _, partitionOffset := range offsets {
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
// 2. committed offset could be behind start offset if we are falling behind retention period.
startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset)
endOffset := partitionOffset.End.Offset
for partition, l := range offsets {
// Skip if there's no lag
if startOffset >= endOffset {
if l.Lag() <= 0 {
continue
}
var jobCount int
currentStart := startOffset
currentStart := l.FirstUncommittedOffset()
endOffset := l.NextAvailableOffset()
// Create jobs of size targetRecordCount until we reach endOffset
for currentStart < endOffset {
if maxJobsPerPartition > 0 && jobCount >= maxJobsPerPartition {
@ -87,7 +82,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int,
}
job := NewJobWithMetadata(
types.NewJob(partitionOffset.Partition, types.Offsets{
types.NewJob(partition, types.Offsets{
Min: currentStart,
Max: currentEnd,
}),

@ -7,16 +7,16 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/kafka/partition"
)
type mockOffsetReader struct {
groupLag map[int32]kadm.GroupMemberLag
groupLag map[int32]partition.Lag
}
func (m *mockOffsetReader) GroupLag(_ context.Context, _ int64) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetReader) GroupLag(_ context.Context, _ int64) (map[int32]partition.Lag, error) {
return m.groupLag, nil
}
@ -34,22 +34,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
recordCount int64
minOffsetsPerJob int
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
groupLag map[int32]partition.Lag
}{
{
name: "single partition, single job",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 150,
},
Partition: 0,
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
50, // startOffset (committed offset)
150, // endOffset
100, // committedOffset
50, // rawLag (endOffset - committedOffset)
),
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
@ -62,16 +59,13 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
name: "single partition, multiple jobs",
recordCount: 50,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 200,
},
Partition: 0,
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
50, // startOffset (committed offset)
200, // endOffset
100, // committedOffset
100, // rawLag (endOffset - committedOffset)
),
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
@ -88,25 +82,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
name: "multiple partitions",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 150,
},
Partition: 0,
},
1: {
Commit: kadm.Offset{
At: 200,
},
End: kadm.ListedOffset{
Offset: 400,
},
Partition: 1,
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
100, // startOffset (committed offset)
150, // endOffset
100, // committedOffset
50, // rawLag (endOffset - committedOffset)
),
1: partition.NewLag(
200, // startOffset (committed offset)
400, // endOffset
200, // committedOffset
200, // rawLag (endOffset - committedOffset)
),
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
@ -127,16 +115,13 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
name: "no lag",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 100,
},
Partition: 0,
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
100, // startOffset (committed offset)
100, // endOffset
100, // committedOffset
0, // rawLag (endOffset - committedOffset)
),
},
expectedJobs: nil,
},
@ -144,25 +129,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
name: "skip small jobs",
recordCount: 100,
minOffsetsPerJob: 40,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 130, // Only 30 records available, less than minimum
},
Partition: 0,
},
1: {
Commit: kadm.Offset{
At: 200,
},
End: kadm.ListedOffset{
Offset: 300, // 100 records available, more than minimum
},
Partition: 1,
},
groupLag: map[int32]partition.Lag{
0: partition.NewLag(
100, // startOffset (committed offset)
130, // endOffset
100, // committedOffset
30, // rawLag (endOffset - committedOffset)
),
1: partition.NewLag(
200, // startOffset (committed offset)
300, // endOffset
200, // committedOffset
100, // rawLag (endOffset - committedOffset)
),
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(

@ -51,7 +51,12 @@ func (s JobStatus) String() string {
}
}
func (s JobStatus) IsFinished() bool {
return s == JobStatusComplete || s == JobStatusFailed || s == JobStatusExpired
}
// Offsets represents the range of offsets to process
// [Min,Max) exclusivity
type Offsets struct {
Min int64
Max int64

@ -17,12 +17,59 @@ import (
"github.com/grafana/loki/v3/pkg/kafka/client"
)
// Partition level metadata in a more easily digestible form than what Kafka provides
type Lag struct {
// First Available Offset in retention
startOffset int64
// Exclusive; the next available offset (as of yet unwritten)
endOffset int64
// Last committed offset
committedOffset int64
// rawLag measures how far behind the most recently committed offset is from the current offset.
// In special cases, this can be positive even when there are no more records to process,
// which happens when there is a gap between the last committed offset and the current offset, but
// it is out of retention (unrecoverable).
rawLag int64
}
func NewLag(startOffset, endOffset, committedOffset, rawLag int64) Lag {
return Lag{
startOffset: startOffset,
endOffset: endOffset,
committedOffset: committedOffset,
rawLag: rawLag,
}
}
// FirstUncommittedOffset returns the first offset that has not yet been committed
func (l Lag) FirstUncommittedOffset() int64 {
// startOffset is the previously-committed offset, so we need to start processing the first
// _uncommitted_ offset
return max(l.committedOffset+1, l.startOffset)
}
func (l Lag) LastCommittedOffset() int64 {
return l.committedOffset
}
// NextAvailableOffset returns the next unwritten offset in a partition,
// i.e. the end offset (exclusive)
func (l Lag) NextAvailableOffset() int64 {
return l.endOffset
}
// Lag returns the difference between the last produced offset
// and the first Uncommitted (but available) offset
func (l Lag) Lag() int64 {
return l.endOffset - l.FirstUncommittedOffset()
}
type OffsetManager interface {
Topic() string
ConsumerGroup() string
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error)
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
Commit(ctx context.Context, partition int32, offset int64) error
@ -181,7 +228,7 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
}
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]kadm.GroupMemberLag, error) {
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) {
lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis)
if err != nil {
return nil, err
@ -192,7 +239,26 @@ func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis
return nil, errors.New("no lag found for the topic")
}
return offsets, nil
res := make(map[int32]Lag, len(offsets))
for partition, partitionOffset := range offsets {
res[partition] = Lag{
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
// 2. committed offset could be behind start offset if we are falling behind retention period.
// startOffset is the previously-committed offset, so we need to start processing the first
// _uncommitted_ offset
startOffset: max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset),
// endOffset is initially the next available offset: this is why we treat jobs as end-exclusive:
// so we won't try polling forever to a partition that won't have any more records
endOffset: partitionOffset.End.Offset,
committedOffset: partitionOffset.Commit.At,
rawLag: partitionOffset.Lag,
}
}
return res, nil
}
// Commit commits an offset to the consumer group

@ -1869,7 +1869,6 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
s, err := blockscheduler.NewScheduler(
t.Cfg.BlockScheduler,
blockscheduler.NewJobQueue(t.Cfg.BlockScheduler.JobQueueConfig, logger, prometheus.DefaultRegisterer),
offsetManager,
logger,
prometheus.DefaultRegisterer,

Loading…
Cancel
Save