diff --git a/pkg/limits/partition_lifecycler.go b/pkg/limits/partition_lifecycler.go index d267ecc339..2e60bbc757 100644 --- a/pkg/limits/partition_lifecycler.go +++ b/pkg/limits/partition_lifecycler.go @@ -74,30 +74,29 @@ func (l *partitionLifecycler) Assign(ctx context.Context, _ *kgo.Client, topics } // Revoke implements kgo.OnPartitionsRevoked. -func (l *partitionLifecycler) Revoke(_ context.Context, _ *kgo.Client, topics map[string][]int32) { +func (l *partitionLifecycler) Revoke(ctx context.Context, client *kgo.Client, topics map[string][]int32) { + l.revoke(ctx, client, topics) +} + +// Lost implements kgo.OnPartitionsLost. +func (l *partitionLifecycler) Lost(ctx context.Context, client *kgo.Client, topics map[string][]int32) { + l.revoke(ctx, client, topics) +} + +// Revokes all partitions in topics. It expects just one topic and panics if +// topics contains more than one topic. +func (l *partitionLifecycler) revoke(_ context.Context, _ *kgo.Client, topics map[string][]int32) { if len(topics) > 1 { panic(fmt.Sprintf("expected one topic, received %d topics", len(topics))) } - // We expect just one topic, and panic if topics contains more than one - // topic. The range over topics just makes it easier to access the first - // value in a map containing a single key. + // The range over topics just makes it easier to access the first value + // in a map containing a single key. for _, partitions := range topics { l.partitionManager.Revoke(partitions) l.usage.EvictPartitions(partitions) } } -// Lost implements kgo.OnPartitionsLost. -func (l *partitionLifecycler) Lost(_ context.Context, _ *kgo.Client, topics map[string][]int32) { - for _, partitions := range topics { - for _, partition := range partitions { - // TODO(grobinson): Implement logic to handle partition loss. - // For now, we just log the event to measure if it happens at all. - level.Warn(l.logger).Log("msg", "partition lost", "partition", partition) - } - } -} - func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, partition int32) error { logger := log.With(l.logger, "partition", partition) // Get the start offset for the partition. This can be greater than zero