diff --git a/pkg/kafka/partition/offset_manager.go b/pkg/kafka/partition/offset_manager.go index 7fa446fdc6..a5a9f56c2d 100644 --- a/pkg/kafka/partition/offset_manager.go +++ b/pkg/kafka/partition/offset_manager.go @@ -18,61 +18,12 @@ import ( "github.com/grafana/loki/v3/pkg/kafka/client" ) -// Partition level metadata in a more easily digestible form than what Kafka provides -type Lag struct { - // First Available Offset in retention - startOffset int64 - // Exclusive; the next available offset (as of yet unwritten) - endOffset int64 - // Last committed offset - committedOffset int64 - // rawLag measures how far behind the most recently committed offset is from the current offset. - // In special cases, this can be positive even when there are no more records to process, - // which happens when there is a gap between the last committed offset and the current offset, but - // it is out of retention (unrecoverable). - rawLag int64 -} - -func NewLag(startOffset, endOffset, committedOffset, rawLag int64) Lag { - return Lag{ - startOffset: startOffset, - endOffset: endOffset, - committedOffset: committedOffset, - rawLag: rawLag, - } -} - -// FirstUncommittedOffset returns the first offset that has not yet been committed -func (l Lag) FirstUncommittedOffset() int64 { - // startOffset is the previously-committed offset, so we need to start processing the first - // _uncommitted_ offset - return max(l.committedOffset+1, l.startOffset) -} - -func (l Lag) LastCommittedOffset() int64 { - return l.committedOffset -} - -// NextAvailableOffset returns the next unwritten offset in a partition, -// i.e. the end offset (exclusive) -func (l Lag) NextAvailableOffset() int64 { - return l.endOffset -} - -// Lag returns the difference between the last produced offset -// and the first Uncommitted (but available) offset -func (l Lag) Lag() int64 { - return l.endOffset - l.FirstUncommittedOffset() -} - type OffsetManager interface { Topic() string ConsumerGroup() string - // GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits. - GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) - FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) - FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) + LastCommittedOffset(ctx context.Context, partition int32) (int64, error) + PartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) Commit(ctx context.Context, partition int32, offset int64) error } @@ -157,8 +108,8 @@ func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t return listed.Offset, nil } -// FetchLastCommittedOffset retrieves the last committed offset for this partition -func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) { +// LastCommittedOffset retrieves the last committed offset for this partition +func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) { req := kmsg.NewPtrOffsetFetchRequest() req.Topics = []kmsg.OffsetFetchRequestTopic{{ Topic: r.cfg.Topic, @@ -203,7 +154,7 @@ func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, parti } // FetchPartitionOffset retrieves the offset for a specific position -func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) { +func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) { partitionReq := kmsg.NewListOffsetsRequestTopicPartition() partitionReq.Partition = partitionID partitionReq.Timestamp = int64(position) @@ -248,40 +199,6 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition return partition.Offset, nil } -// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits. -func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) { - lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis) - if err != nil { - return nil, err - } - - offsets, ok := lag[r.cfg.Topic] - if !ok { - return nil, errors.New("no lag found for the topic") - } - - res := make(map[int32]Lag, len(offsets)) - - for partition, partitionOffset := range offsets { - res[partition] = Lag{ - // 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. - // no additional validation is needed here - // 2. committed offset could be behind start offset if we are falling behind retention period. - - // startOffset is the previously-committed offset, so we need to start processing the first - // _uncommitted_ offset - startOffset: max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset), - // endOffset is initially the next available offset: this is why we treat jobs as end-exclusive: - // so we won't try polling forever to a partition that won't have any more records - endOffset: partitionOffset.End.Offset, - committedOffset: partitionOffset.Commit.At, - rawLag: partitionOffset.Lag, - } - } - - return res, nil -} - // Commit commits an offset to the consumer group func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error { admin := kadm.NewClient(r.client) diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 83bc958093..f890efceae 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -143,7 +143,7 @@ func (s *ReaderService) starting(ctx context.Context) error { logger := log.With(s.logger, "phase", phaseStarting) s.reader.SetPhase(phaseStarting) // Fetch the last committed offset to determine where to start reading - lastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID) + lastCommittedOffset, err := s.offsetManager.LastCommittedOffset(ctx, s.partitionID) if err != nil { return fmt.Errorf("fetching last committed offset: %w", err) } @@ -235,14 +235,14 @@ func (s *ReaderService) fetchUntilLagSatisfied( for b.Ongoing() { // Send a direct request to the Kafka backend to fetch the partition start offset. - partitionStartOffset, err := s.offsetManager.FetchPartitionOffset(ctx, s.partitionID, KafkaStartOffset) + partitionStartOffset, err := s.offsetManager.PartitionOffset(ctx, s.partitionID, KafkaStartOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) b.Wait() continue } - consumerGroupLastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID) + consumerGroupLastCommittedOffset, err := s.offsetManager.LastCommittedOffset(ctx, s.partitionID) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err) b.Wait() @@ -253,7 +253,7 @@ func (s *ReaderService) fetchUntilLagSatisfied( // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further // latency. lastProducedOffsetRequestedAt := time.Now() - lastProducedOffset, err := s.offsetManager.FetchPartitionOffset(ctx, s.partitionID, KafkaEndOffset) + lastProducedOffset, err := s.offsetManager.PartitionOffset(ctx, s.partitionID, KafkaEndOffset) if err != nil { level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) b.Wait() diff --git a/pkg/limits/partition_lifecycler.go b/pkg/limits/partition_lifecycler.go index 148fa328d3..aea81226f5 100644 --- a/pkg/limits/partition_lifecycler.go +++ b/pkg/limits/partition_lifecycler.go @@ -78,7 +78,7 @@ func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, par logger := log.With(l.logger, "partition", partition) // Get the start offset for the partition. This can be greater than zero // if a retention period has deleted old records. - startOffset, err := l.offsetManager.FetchPartitionOffset( + startOffset, err := l.offsetManager.PartitionOffset( ctx, partition, kafka_partition.KafkaStartOffset) if err != nil { return fmt.Errorf("failed to get last produced offset: %w", err) @@ -87,7 +87,7 @@ func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, par // record. For example, if a partition contains 1 record, then the last // produced offset is 1. However, the offset of the last produced record // is 0, as offsets start from 0. - lastProducedOffset, err := l.offsetManager.FetchPartitionOffset( + lastProducedOffset, err := l.offsetManager.PartitionOffset( ctx, partition, kafka_partition.KafkaEndOffset) if err != nil { return fmt.Errorf("failed to get last produced offset: %w", err)