From acb8b89a2fd6141bbef253bc216b62778c5bb964 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 17 Sep 2025 11:25:01 +0100 Subject: [PATCH] fix: fix context canceled stops reader from cleaning up (#19210) --- pkg/ingester/kafka_consumer.go | 17 ++++++++++------- pkg/kafka/partition/reader_service.go | 4 +--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index 983aa1716d..7319284fc5 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -79,7 +79,7 @@ type kafkaConsumer struct { metrics *consumerMetrics } -func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { +func (kc *kafkaConsumer) Start(ctx context.Context, recordsCh <-chan []partition.Record) func() { var wg sync.WaitGroup wg.Add(1) go func() { @@ -87,13 +87,16 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti for { select { case <-ctx.Done(): - // It can happen that the context is canceled while there are unprocessed records - // in the channel. However, we do not need to process all remaining records, - // and can exit out instead, as partition offsets are not committed until - // the record has been handed over to the Pusher and committed in the WAL. - level.Info(kc.logger).Log("msg", "shutting down kafka consumer") + // We have been asked to exit, even if we haven't processed + // all records. No unprocessed records will be committed. + level.Info(kc.logger).Log("msg", "stopping, context canceled") return - case records := <-recordsChan: + case records, ok := <-recordsCh: + if !ok { + // All records have been processed, we can exit. + level.Info(kc.logger).Log("msg", "stopping, channel closed") + return + } kc.consume(ctx, records) } } diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 39b47639f1..83bc958093 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -200,12 +200,10 @@ func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context, logger return fmt.Errorf("failed to create consumer: %w", err) } - cancelCtx, cancel := context.WithCancel(ctx) recordsCh := make(chan []Record) - wait := consumer.Start(cancelCtx, recordsCh) + wait := consumer.Start(ctx, recordsCh) defer func() { close(recordsCh) - cancel() wait() }()