|
|
|
@ -100,6 +100,9 @@ func (p *Reader) start(ctx context.Context) error { |
|
|
|
|
return errors.Wrap(err, "creating kafka reader client") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p.metrics.phase.WithLabelValues("starting").Set(1) |
|
|
|
|
p.metrics.phase.WithLabelValues("running").Set(0) |
|
|
|
|
|
|
|
|
|
// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
|
|
|
|
|
lastCommittedOffset := p.fetchLastCommittedOffset(ctx) |
|
|
|
|
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ |
|
|
|
@ -141,6 +144,9 @@ func (p *Reader) start(ctx context.Context) error { |
|
|
|
|
// data from Kafka, and send it to the consumer.
|
|
|
|
|
func (p *Reader) run(ctx context.Context) error { |
|
|
|
|
level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup) |
|
|
|
|
p.metrics.phase.WithLabelValues("starting").Set(0) |
|
|
|
|
p.metrics.phase.WithLabelValues("running").Set(1) |
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
@ -513,6 +519,7 @@ func isErrFetch(fetch kgo.Fetch) bool { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type readerMetrics struct { |
|
|
|
|
phase *prometheus.GaugeVec |
|
|
|
|
receiveDelayWhenStarting prometheus.Observer |
|
|
|
|
receiveDelayWhenRunning prometheus.Observer |
|
|
|
|
recordsPerFetch prometheus.Histogram |
|
|
|
@ -538,6 +545,10 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics { |
|
|
|
|
}, []string{"phase"}) |
|
|
|
|
|
|
|
|
|
return readerMetrics{ |
|
|
|
|
phase: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ |
|
|
|
|
Name: "loki_ingest_storage_reader_phase", |
|
|
|
|
Help: "The current phase of the consumer.", |
|
|
|
|
}, []string{"phase"}), |
|
|
|
|
receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), |
|
|
|
|
receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), |
|
|
|
|
kprom: client.NewReaderClientMetrics("partition-reader", reg), |
|
|
|
|