|
|
|
|
@ -18,61 +18,12 @@ import ( |
|
|
|
|
"github.com/grafana/loki/v3/pkg/kafka/client" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Partition level metadata in a more easily digestible form than what Kafka provides
|
|
|
|
|
type Lag struct { |
|
|
|
|
// First Available Offset in retention
|
|
|
|
|
startOffset int64 |
|
|
|
|
// Exclusive; the next available offset (as of yet unwritten)
|
|
|
|
|
endOffset int64 |
|
|
|
|
// Last committed offset
|
|
|
|
|
committedOffset int64 |
|
|
|
|
// rawLag measures how far behind the most recently committed offset is from the current offset.
|
|
|
|
|
// In special cases, this can be positive even when there are no more records to process,
|
|
|
|
|
// which happens when there is a gap between the last committed offset and the current offset, but
|
|
|
|
|
// it is out of retention (unrecoverable).
|
|
|
|
|
rawLag int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewLag(startOffset, endOffset, committedOffset, rawLag int64) Lag { |
|
|
|
|
return Lag{ |
|
|
|
|
startOffset: startOffset, |
|
|
|
|
endOffset: endOffset, |
|
|
|
|
committedOffset: committedOffset, |
|
|
|
|
rawLag: rawLag, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FirstUncommittedOffset returns the first offset that has not yet been committed
|
|
|
|
|
func (l Lag) FirstUncommittedOffset() int64 { |
|
|
|
|
// startOffset is the previously-committed offset, so we need to start processing the first
|
|
|
|
|
// _uncommitted_ offset
|
|
|
|
|
return max(l.committedOffset+1, l.startOffset) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l Lag) LastCommittedOffset() int64 { |
|
|
|
|
return l.committedOffset |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NextAvailableOffset returns the next unwritten offset in a partition,
|
|
|
|
|
// i.e. the end offset (exclusive)
|
|
|
|
|
func (l Lag) NextAvailableOffset() int64 { |
|
|
|
|
return l.endOffset |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Lag returns the difference between the last produced offset
|
|
|
|
|
// and the first Uncommitted (but available) offset
|
|
|
|
|
func (l Lag) Lag() int64 { |
|
|
|
|
return l.endOffset - l.FirstUncommittedOffset() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type OffsetManager interface { |
|
|
|
|
Topic() string |
|
|
|
|
ConsumerGroup() string |
|
|
|
|
|
|
|
|
|
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
|
|
|
|
|
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) |
|
|
|
|
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) |
|
|
|
|
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) |
|
|
|
|
LastCommittedOffset(ctx context.Context, partition int32) (int64, error) |
|
|
|
|
PartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) |
|
|
|
|
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) |
|
|
|
|
Commit(ctx context.Context, partition int32, offset int64) error |
|
|
|
|
} |
|
|
|
|
@ -157,8 +108,8 @@ func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t |
|
|
|
|
return listed.Offset, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FetchLastCommittedOffset retrieves the last committed offset for this partition
|
|
|
|
|
func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) { |
|
|
|
|
// LastCommittedOffset retrieves the last committed offset for this partition
|
|
|
|
|
func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) { |
|
|
|
|
req := kmsg.NewPtrOffsetFetchRequest() |
|
|
|
|
req.Topics = []kmsg.OffsetFetchRequestTopic{{ |
|
|
|
|
Topic: r.cfg.Topic, |
|
|
|
|
@ -203,7 +154,7 @@ func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, parti |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FetchPartitionOffset retrieves the offset for a specific position
|
|
|
|
|
func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) { |
|
|
|
|
func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) { |
|
|
|
|
partitionReq := kmsg.NewListOffsetsRequestTopicPartition() |
|
|
|
|
partitionReq.Partition = partitionID |
|
|
|
|
partitionReq.Timestamp = int64(position) |
|
|
|
|
@ -248,40 +199,6 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition |
|
|
|
|
return partition.Offset, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
|
|
|
|
|
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) { |
|
|
|
|
lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
offsets, ok := lag[r.cfg.Topic] |
|
|
|
|
if !ok { |
|
|
|
|
return nil, errors.New("no lag found for the topic") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
res := make(map[int32]Lag, len(offsets)) |
|
|
|
|
|
|
|
|
|
for partition, partitionOffset := range offsets { |
|
|
|
|
res[partition] = Lag{ |
|
|
|
|
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
|
|
|
|
|
// no additional validation is needed here
|
|
|
|
|
// 2. committed offset could be behind start offset if we are falling behind retention period.
|
|
|
|
|
|
|
|
|
|
// startOffset is the previously-committed offset, so we need to start processing the first
|
|
|
|
|
// _uncommitted_ offset
|
|
|
|
|
startOffset: max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset), |
|
|
|
|
// endOffset is initially the next available offset: this is why we treat jobs as end-exclusive:
|
|
|
|
|
// so we won't try polling forever to a partition that won't have any more records
|
|
|
|
|
endOffset: partitionOffset.End.Offset, |
|
|
|
|
committedOffset: partitionOffset.Commit.At, |
|
|
|
|
rawLag: partitionOffset.Lag, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return res, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Commit commits an offset to the consumer group
|
|
|
|
|
func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error { |
|
|
|
|
admin := kadm.NewClient(r.client) |
|
|
|
|
|