From d59a5e2cfbf5dab3d2af1d2d4f3dcb9f272cc48b Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Wed, 4 Dec 2024 13:46:24 +0530 Subject: [PATCH] chore: pass down reader metrics to avoid duplicate registration (#15246) --- pkg/kafka/partition/reader.go | 26 +++++++++++--------------- pkg/kafka/partition/reader_service.go | 3 ++- pkg/loki/modules.go | 3 ++- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 15e78c1946..58f9e77c4b 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/loki/v3/pkg/kafka" @@ -25,12 +26,6 @@ const ( KafkaEndOffset SpecialOffset = -1 ) -var rm *readerMetrics - -func init() { - rm = newReaderMetrics(prometheus.DefaultRegisterer) -} - type Record struct { // Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka. Ctx context.Context @@ -47,18 +42,19 @@ type Reader interface { SetOffsetForConsumption(offset int64) } -// readerMetrics contains metrics specific to Kafka reading operations -type readerMetrics struct { +// ReaderMetrics contains metrics specific to Kafka reading operations +type ReaderMetrics struct { recordsPerFetch prometheus.Histogram fetchesErrors prometheus.Counter fetchesTotal prometheus.Counter fetchWaitDuration prometheus.Histogram receiveDelay prometheus.Histogram lastCommittedOffset prometheus.Gauge + kprom *kprom.Metrics } -func newReaderMetrics(r prometheus.Registerer) *readerMetrics { - return &readerMetrics{ +func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics { + return &ReaderMetrics{ fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "loki_kafka_reader_fetch_wait_duration_seconds", Help: "How long the reader spent waiting for a batch of records from Kafka.", @@ -86,6 +82,7 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics { NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), }), + kprom: client.NewReaderClientMetrics("partition-reader", r), } } @@ -95,7 +92,7 @@ type KafkaReader struct { topic string partitionID int32 consumerGroup string - metrics *readerMetrics + metrics *ReaderMetrics logger log.Logger } @@ -103,13 +100,12 @@ func NewKafkaReader( cfg kafka.Config, partitionID int32, logger log.Logger, - reg prometheus.Registerer, + metrics *ReaderMetrics, ) (*KafkaReader, error) { // Create a new Kafka client for this reader - clientMetrics := client.NewReaderClientMetrics("partition-reader", reg) c, err := client.NewReaderClient( cfg, - clientMetrics, + metrics.kprom, log.With(logger, "component", "kafka-client"), ) if err != nil { @@ -120,7 +116,7 @@ func NewKafkaReader( client: c, topic: cfg.Topic, partitionID: partitionID, - metrics: rm, + metrics: metrics, logger: logger, }, nil } diff --git a/pkg/kafka/partition/reader_service.go b/pkg/kafka/partition/reader_service.go index 4fba065c94..5c67cde5fd 100644 --- a/pkg/kafka/partition/reader_service.go +++ b/pkg/kafka/partition/reader_service.go @@ -74,11 +74,12 @@ func NewReaderService( logger log.Logger, reg prometheus.Registerer, ) (*ReaderService, error) { + readerMetrics := NewReaderMetrics(reg) reader, err := NewKafkaReader( kafkaCfg, partitionID, logger, - reg, + readerMetrics, ) if err != nil { return nil, fmt.Errorf("creating kafka reader: %w", err) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6a88f719c6..fb5c800cc0 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1821,11 +1821,12 @@ func (t *Loki) initBlockBuilder() (services.Service, error) { return nil, fmt.Errorf("calculating block builder partition ID: %w", err) } + readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer) reader, err := partition.NewKafkaReader( t.Cfg.KafkaConfig, ingestPartitionID, logger, - prometheus.DefaultRegisterer, + readerMetrics, ) if err != nil { return nil, err