fix: revoke partitions if lost (#20030)

pull/20007/head
George Robinson 6 months ago committed by GitHub
parent 960df88bab
commit 1ac5d1f0d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 29
      pkg/limits/partition_lifecycler.go

@ -74,30 +74,29 @@ func (l *partitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics
}
// Revoke implements kgo.OnPartitionsRevoked.
func (l *partitionLifecycler) Revoke(_ context.Context, _ *kgo.Client, topics map[string][]int32) {
func (l *partitionLifecycler) Revoke(ctx context.Context, client *kgo.Client, topics map[string][]int32) {
l.revoke(ctx, client, topics)
}
// Lost implements kgo.OnPartitionsLost.
func (l *partitionLifecycler) Lost(ctx context.Context, client *kgo.Client, topics map[string][]int32) {
l.revoke(ctx, client, topics)
}
// Revokes all partitions in topics. It expects just one topic and panics if
// topics contains more than one topic.
func (l *partitionLifecycler) revoke(_ context.Context, _ *kgo.Client, topics map[string][]int32) {
if len(topics) > 1 {
panic(fmt.Sprintf("expected one topic, received %d topics", len(topics)))
}
// We expect just one topic, and panic if topics contains more than one
// topic. The range over topics just makes it easier to access the first
// value in a map containing a single key.
// The range over topics just makes it easier to access the first value
// in a map containing a single key.
for _, partitions := range topics {
l.partitionManager.Revoke(partitions)
l.usage.EvictPartitions(partitions)
}
}
// Lost implements kgo.OnPartitionsLost.
func (l *partitionLifecycler) Lost(_ context.Context, _ *kgo.Client, topics map[string][]int32) {
for _, partitions := range topics {
for _, partition := range partitions {
// TODO(grobinson): Implement logic to handle partition loss.
// For now, we just log the event to measure if it happens at all.
level.Warn(l.logger).Log("msg", "partition lost", "partition", partition)
}
}
}
func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, partition int32) error {
logger := log.With(l.logger, "partition", partition)
// Get the start offset for the partition. This can be greater than zero

Loading…
Cancel
Save