|
|
|
|
@ -79,7 +79,7 @@ type kafkaConsumer struct { |
|
|
|
|
metrics *consumerMetrics |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { |
|
|
|
|
func (kc *kafkaConsumer) Start(ctx context.Context, recordsCh <-chan []partition.Record) func() { |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
wg.Add(1) |
|
|
|
|
go func() { |
|
|
|
|
@ -87,13 +87,16 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
// It can happen that the context is canceled while there are unprocessed records
|
|
|
|
|
// in the channel. However, we do not need to process all remaining records,
|
|
|
|
|
// and can exit out instead, as partition offsets are not committed until
|
|
|
|
|
// the record has been handed over to the Pusher and committed in the WAL.
|
|
|
|
|
level.Info(kc.logger).Log("msg", "shutting down kafka consumer") |
|
|
|
|
// We have been asked to exit, even if we haven't processed
|
|
|
|
|
// all records. No unprocessed records will be committed.
|
|
|
|
|
level.Info(kc.logger).Log("msg", "stopping, context canceled") |
|
|
|
|
return |
|
|
|
|
case records := <-recordsChan: |
|
|
|
|
case records, ok := <-recordsCh: |
|
|
|
|
if !ok { |
|
|
|
|
// All records have been processed, we can exit.
|
|
|
|
|
level.Info(kc.logger).Log("msg", "stopping, channel closed") |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
kc.consume(ctx, records) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|