|
|
|
@ -136,7 +136,8 @@ func (s *ReaderService) starting(ctx context.Context) error { |
|
|
|
|
"partition", s.reader.Partition(), |
|
|
|
|
"consumer_group", s.reader.ConsumerGroup(), |
|
|
|
|
) |
|
|
|
|
s.metrics.reportStarting(s.reader.Partition()) |
|
|
|
|
s.metrics.reportOwnerOfPartition(s.reader.Partition()) |
|
|
|
|
s.metrics.reportStarting() |
|
|
|
|
|
|
|
|
|
// Fetch the last committed offset to determine where to start reading
|
|
|
|
|
lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx) |
|
|
|
@ -196,7 +197,7 @@ func (s *ReaderService) running(ctx context.Context) error { |
|
|
|
|
"partition", s.reader.Partition(), |
|
|
|
|
"consumer_group", s.reader.ConsumerGroup(), |
|
|
|
|
) |
|
|
|
|
s.metrics.reportRunning(s.reader.Partition()) |
|
|
|
|
s.metrics.reportRunning() |
|
|
|
|
|
|
|
|
|
consumer, err := s.consumerFactory(s.committer) |
|
|
|
|
if err != nil { |
|
|
|
@ -396,14 +397,16 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record { |
|
|
|
|
return records |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *serviceMetrics) reportStarting(partition int32) { |
|
|
|
|
s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) |
|
|
|
|
func (s *serviceMetrics) reportOwnerOfPartition(id int32) { |
|
|
|
|
s.partition.WithLabelValues(strconv.Itoa(int(id))).Set(1) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *serviceMetrics) reportStarting() { |
|
|
|
|
s.phase.WithLabelValues(phaseStarting).Set(1) |
|
|
|
|
s.phase.WithLabelValues(phaseRunning).Set(0) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *serviceMetrics) reportRunning(partition int32) { |
|
|
|
|
s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) |
|
|
|
|
func (s *serviceMetrics) reportRunning() { |
|
|
|
|
s.phase.WithLabelValues(phaseStarting).Set(0) |
|
|
|
|
s.phase.WithLabelValues(phaseRunning).Set(1) |
|
|
|
|
} |
|
|
|
|