|
|
|
@ -3,6 +3,7 @@ package partition |
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"math" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
@ -46,7 +47,7 @@ type Reader interface { |
|
|
|
|
|
|
|
|
|
// ReaderMetrics contains metrics specific to Kafka reading operations
|
|
|
|
|
type ReaderMetrics struct { |
|
|
|
|
consumptionLag *prometheus.GaugeVec |
|
|
|
|
consumptionLag *prometheus.HistogramVec |
|
|
|
|
recordsPerFetch prometheus.Histogram |
|
|
|
|
fetchesErrors prometheus.Counter |
|
|
|
|
fetchesTotal prometheus.Counter |
|
|
|
@ -56,9 +57,14 @@ type ReaderMetrics struct { |
|
|
|
|
|
|
|
|
|
func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics { |
|
|
|
|
return &ReaderMetrics{ |
|
|
|
|
consumptionLag: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ |
|
|
|
|
Name: "loki_kafka_reader_consumption_lag_seconds", |
|
|
|
|
Help: "The estimated consumption lag in seconds, measured as the difference between the current time and the timestamp of the record.", |
|
|
|
|
consumptionLag: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ |
|
|
|
|
Name: "loki_kafka_reader_consumption_lag_seconds", |
|
|
|
|
Help: "The estimated consumption lag in seconds, measured as the difference between the current time and the timestamp of the record.", |
|
|
|
|
NativeHistogramZeroThreshold: math.Pow(2, -10), |
|
|
|
|
NativeHistogramBucketFactor: 1.2, |
|
|
|
|
NativeHistogramMaxBucketNumber: 100, |
|
|
|
|
NativeHistogramMinResetDuration: 1 * time.Hour, |
|
|
|
|
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), |
|
|
|
|
}, []string{"phase"}), |
|
|
|
|
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ |
|
|
|
|
Name: "loki_kafka_reader_fetch_wait_duration_seconds", |
|
|
|
@ -146,7 +152,7 @@ func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, e |
|
|
|
|
var numRecords int |
|
|
|
|
fetches.EachRecord(func(record *kgo.Record) { |
|
|
|
|
numRecords++ |
|
|
|
|
r.metrics.consumptionLag.WithLabelValues(r.phase).Set(time.Since(record.Timestamp).Seconds()) |
|
|
|
|
r.metrics.consumptionLag.WithLabelValues(r.phase).Observe(time.Since(record.Timestamp).Seconds()) |
|
|
|
|
}) |
|
|
|
|
r.metrics.recordsPerFetch.Observe(float64(numRecords)) |
|
|
|
|
|
|
|
|
|