From 01cf36df78d53abedd76eb5162df88cff3017895 Mon Sep 17 00:00:00 2001 From: aarogoss <32306622+aarogoss@users.noreply.github.com> Date: Wed, 23 Apr 2025 06:50:20 -0600 Subject: [PATCH] feat: adds partition-ingester push latency histogram (#17385) --- pkg/ingester/kafka_consumer.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index a145f1e0e4..4b28894aff 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -22,6 +22,7 @@ import ( type consumerMetrics struct { consumeLatency prometheus.Histogram currentOffset prometheus.Gauge + pushLatency prometheus.Histogram } // newConsumerMetrics initializes and returns a new consumerMetrics instance @@ -36,6 +37,11 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { Name: "loki_ingester_partition_current_offset", Help: "The current offset of the Kafka ingester consumer.", }), + pushLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_partition_push_latency_seconds", + Help: "The latency of a push request after consuming from Kafka", + NativeHistogramBucketFactor: 1.1, + }), } } @@ -114,7 +120,12 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record Streams: []logproto.Stream{stream}, } if err := retryWithBackoff(ctx, func(attempts int) error { - if _, err := kc.pusher.Push(recordCtx, req); err != nil { + pushTime := time.Now() + _, err := kc.pusher.Push(recordCtx, req) + + kc.metrics.pushLatency.Observe(time.Since(pushTime).Seconds()) + + if err != nil { level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts) return err }