chore: pass down reader metrics to avoid duplicate registration (#15246)

pull/15224/head
Ashwanth 2 years ago committed by GitHub
parent e0cf6da466
commit d59a5e2cfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 26
      pkg/kafka/partition/reader.go
  2. 3
      pkg/kafka/partition/reader_service.go
  3. 3
      pkg/loki/modules.go

@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
"github.com/grafana/loki/v3/pkg/kafka"
@ -25,12 +26,6 @@ const (
KafkaEndOffset SpecialOffset = -1
)
var rm *readerMetrics
func init() {
rm = newReaderMetrics(prometheus.DefaultRegisterer)
}
type Record struct {
// Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka.
Ctx context.Context
@ -47,18 +42,19 @@ type Reader interface {
SetOffsetForConsumption(offset int64)
}
// readerMetrics contains metrics specific to Kafka reading operations
type readerMetrics struct {
// ReaderMetrics contains metrics specific to Kafka reading operations
type ReaderMetrics struct {
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
receiveDelay prometheus.Histogram
lastCommittedOffset prometheus.Gauge
kprom *kprom.Metrics
}
func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
return &readerMetrics{
func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
return &ReaderMetrics{
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_kafka_reader_fetch_wait_duration_seconds",
Help: "How long the reader spent waiting for a batch of records from Kafka.",
@ -86,6 +82,7 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18),
}),
kprom: client.NewReaderClientMetrics("partition-reader", r),
}
}
@ -95,7 +92,7 @@ type KafkaReader struct {
topic string
partitionID int32
consumerGroup string
metrics *readerMetrics
metrics *ReaderMetrics
logger log.Logger
}
@ -103,13 +100,12 @@ func NewKafkaReader(
cfg kafka.Config,
partitionID int32,
logger log.Logger,
reg prometheus.Registerer,
metrics *ReaderMetrics,
) (*KafkaReader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
cfg,
clientMetrics,
metrics.kprom,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
@ -120,7 +116,7 @@ func NewKafkaReader(
client: c,
topic: cfg.Topic,
partitionID: partitionID,
metrics: rm,
metrics: metrics,
logger: logger,
}, nil
}

@ -74,11 +74,12 @@ func NewReaderService(
logger log.Logger,
reg prometheus.Registerer,
) (*ReaderService, error) {
readerMetrics := NewReaderMetrics(reg)
reader, err := NewKafkaReader(
kafkaCfg,
partitionID,
logger,
reg,
readerMetrics,
)
if err != nil {
return nil, fmt.Errorf("creating kafka reader: %w", err)

@ -1821,11 +1821,12 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}
readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer)
reader, err := partition.NewKafkaReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
logger,
prometheus.DefaultRegisterer,
readerMetrics,
)
if err != nil {
return nil, err

Loading…
Cancel
Save